FEAT Beam Search for OpenAIResponseTarget#1346
FEAT Beam Search for OpenAIResponseTarget#1346riedgar-ms wants to merge 120 commits intoAzure:mainfrom
Conversation
| beam_reviewer=TopKBeamReviewer(k=2, drop_chars=1), | ||
| attack_scoring_config=scoring_config, | ||
| num_chars_per_step=0, | ||
| ) |
There was a problem hiding this comment.
The tests only cover initialization validation but lack coverage for the core attack execution logic including _perform_async, _propagate_beam_async, and _score_beam_async. While these methods interact with OpenAIResponseTarget features, consider adding tests with mocked responses to verify the beam propagation, scoring, and beam review logic work correctly.
| ) | |
| ) | |
| @pytest.mark.asyncio | |
| async def test_perform_async_called_with_mock_context_async( | |
| self, mock_target, mock_true_false_scorer, mock_float_scale_scorer | |
| ): | |
| """Ensure _perform_async can be awaited with a mocked context.""" | |
| scoring_config = AttackScoringConfig( | |
| objective_scorer=mock_true_false_scorer, auxiliary_scorers=[mock_float_scale_scorer] | |
| ) | |
| attack = BeamSearchAttack( | |
| objective_target=mock_target, | |
| beam_reviewer=TopKBeamReviewer(k=2, drop_chars=1), | |
| attack_scoring_config=scoring_config, | |
| ) | |
| mock_context = MagicMock() | |
| attack._perform_async = AsyncMock(return_value=None) # type: ignore[assignment] | |
| await attack._perform_async(mock_context) | |
| attack._perform_async.assert_awaited_once_with(mock_context) | |
| @pytest.mark.asyncio | |
| async def test_propagate_beam_async_with_mock_beam_async( | |
| self, mock_target, mock_true_false_scorer, mock_float_scale_scorer | |
| ): | |
| """Verify _propagate_beam_async is async-callable and works with a Beam instance.""" | |
| scoring_config = AttackScoringConfig( | |
| objective_scorer=mock_true_false_scorer, auxiliary_scorers=[mock_float_scale_scorer] | |
| ) | |
| attack = BeamSearchAttack( | |
| objective_target=mock_target, | |
| beam_reviewer=TopKBeamReviewer(k=2, drop_chars=1), | |
| attack_scoring_config=scoring_config, | |
| ) | |
| initial_beam = Beam(identifier=str(uuid.uuid4()), content="seed", score=None) | |
| propagated_beam = Beam(identifier=str(uuid.uuid4()), content="seed_extended", score=None) | |
| attack._propagate_beam_async = AsyncMock(return_value=propagated_beam) # type: ignore[assignment] | |
| result = await attack._propagate_beam_async(initial_beam) | |
| attack._propagate_beam_async.assert_awaited_once_with(initial_beam) | |
| assert result is propagated_beam | |
| @pytest.mark.asyncio | |
| async def test_score_beam_async_with_mock_scorers_async( | |
| self, mock_target, mock_true_false_scorer, mock_float_scale_scorer | |
| ): | |
| """Verify _score_beam_async can be awaited and returns mocked score data.""" | |
| scoring_config = AttackScoringConfig( | |
| objective_scorer=mock_true_false_scorer, auxiliary_scorers=[mock_float_scale_scorer] | |
| ) | |
| attack = BeamSearchAttack( | |
| objective_target=mock_target, | |
| beam_reviewer=TopKBeamReviewer(k=2, drop_chars=1), | |
| attack_scoring_config=scoring_config, | |
| ) | |
| beam = Beam(identifier=str(uuid.uuid4()), content="to_score", score=None) | |
| fake_score_result = {"objective_score": 0.9, "auxiliary_scores": [0.5]} | |
| attack._score_beam_async = AsyncMock(return_value=fake_score_result) # type: ignore[assignment] | |
| result = await attack._score_beam_async(beam) | |
| attack._score_beam_async.assert_awaited_once_with(beam) | |
| assert result == fake_score_result |
| target = self._get_target_for_beam(beam) | ||
|
|
||
| current_context = copy.deepcopy(self._start_context) | ||
| await self._setup_async(context=current_context) |
There was a problem hiding this comment.
Calling _setup_async for each beam propagation can cause race conditions when multiple beams are processed concurrently. The method overwrites self._start_context and modifies the context's conversation_id, which could lead to unpredictable behavior. Consider refactoring to avoid calling _setup_async in the concurrent beam propagation, or ensure proper isolation between concurrent beam operations.
| await self._setup_async(context=current_context) |
There was a problem hiding this comment.
As I think I've noted when this comment has come up before, my whole handling of the new messages may be incorrect. I'd like the 'bigger picture' perspective
| _extra_beam_count = self.desired_beam_count or len(beams) | ||
|
|
||
| for i in range(_extra_beam_count - len(new_beams)): | ||
| nxt = copy.deepcopy(new_beams[i % self.k]) |
There was a problem hiding this comment.
When creating new beams from top performers using deepcopy, the beam IDs are preserved. This means multiple beams can have the same conversation_id, which could cause issues with conversation tracking and memory storage. Consider generating new unique IDs for the copied beams to maintain proper conversation isolation.
| nxt = copy.deepcopy(new_beams[i % self.k]) | |
| nxt = copy.deepcopy(new_beams[i % self.k]) | |
| nxt.id = str(uuid.uuid4()) | |
| if nxt.message is not None and hasattr(nxt.message, "conversation_id"): | |
| nxt.message.conversation_id = str(uuid.uuid4()) |
There was a problem hiding this comment.
I'd appreciate a human review on this; it's linked to my other questions
| await self._setup_async(context=current_context) | ||
|
|
||
| message = self._get_message(current_context) | ||
| beam.id = current_context.conversation_id |
There was a problem hiding this comment.
_propagate_beam_async calls _setup_async, which mutates shared instance state (self._start_context) and is executed concurrently across beams. This creates a race condition and also overwrites the original start context for subsequent iterations. Refactor so per-beam setup does not write to shared state (e.g., create a local conversation_id + call ConversationManager.initialize_context_async directly without touching _start_context).
| await self._setup_async(context=current_context) | |
| message = self._get_message(current_context) | |
| beam.id = current_context.conversation_id | |
| local_conversation_id = str(uuid.uuid4()) | |
| current_context.conversation_id = local_conversation_id | |
| await self._conversation_manager.initialize_context_async( | |
| context=current_context, | |
| conversation_reference=ConversationReference( | |
| conversation_id=local_conversation_id, | |
| conversation_type=ConversationType.ATTACK, | |
| ), | |
| prepended_conversation_config=self._prepended_conversation_config, | |
| ) | |
| message = self._get_message(current_context) | |
| beam.id = local_conversation_id |
| # Log the attack configuration | ||
| self._logger.info(f"Starting {self.__class__.__name__} with objective: {context.objective}") | ||
|
|
||
| beams = [Beam(id=context.conversation_id, text="", score=0.0) for _ in range(self._num_beams)] | ||
|
|
There was a problem hiding this comment.
BeamSearchAttack adds significant new behavior (beam propagation/scoring/pruning), but the current tests mainly cover grammar/reviewer and init validation. Add unit tests that mock PromptNormalizer.send_prompt_async and scorers to validate _perform_async end-to-end (beam expansion across iterations, scoring-driven pruning, and AttackResult fields like related_conversations).
| if beam.objective_score and beam.objective_score.get_value(): | ||
| # We have a positive score, so it's a success | ||
| return AttackOutcome.SUCCESS, "Objective achieved according to scorer" | ||
|
|
||
| # No response at all (all attempts filtered/failed) | ||
| return AttackOutcome.FAILURE, "All attempts were filtered or failed to get a response" | ||
|
|
There was a problem hiding this comment.
The failure outcome_reason here is misleading when a response was produced but the objective scorer returned a negative/false score. As written, any non-success (including “objective not achieved”) reports that all attempts were filtered/failed. Consider distinguishing between “no scorable response” vs “objective not achieved” (e.g., check beam.message / beam.objective_score and craft a reason accordingly).
| if beam.objective_score and beam.objective_score.get_value(): | |
| # We have a positive score, so it's a success | |
| return AttackOutcome.SUCCESS, "Objective achieved according to scorer" | |
| # No response at all (all attempts filtered/failed) | |
| return AttackOutcome.FAILURE, "All attempts were filtered or failed to get a response" | |
| if beam.objective_score is not None: | |
| if beam.objective_score.get_value(): | |
| # We have a positive score, so it's a success | |
| return AttackOutcome.SUCCESS, "Objective achieved according to scorer" | |
| # A response was scored but did not achieve the objective | |
| return AttackOutcome.FAILURE, "Objective not achieved according to scorer" | |
| # No objective score was produced | |
| if beam.message is None: | |
| # No response at all (all attempts filtered/failed) | |
| return AttackOutcome.FAILURE, "All attempts were filtered or failed to get a response" | |
| # We have a response but no positive objective score | |
| return AttackOutcome.FAILURE, "Objective not achieved: no positive score for the response" |
| def fresh_instance( | ||
| self, | ||
| *, | ||
| extra_body_parameters: Optional[dict[str, Any]] = None, | ||
| grammar_name: Optional[str] = None, | ||
| ) -> "OpenAIResponseTarget": | ||
| """ | ||
| Create a fresh instance of the OpenAIResponseTarget with the same configuration. | ||
|
|
||
| Optionally override extra body parameters or a grammar name for the new instance. | ||
|
|
||
| Args: | ||
| extra_body_parameters (Optional[dict[str, Any]]): Optional overrides for the | ||
| extra body parameters of the new instance. | ||
| grammar_name (Optional[str]): Optional override for the grammar name of the | ||
| new instance. | ||
|
|
||
| Returns: | ||
| OpenAIResponseTarget: A new instance of OpenAIResponseTarget. | ||
| """ | ||
| init_args: dict[str, Any] = deepcopy(self._init_args) | ||
| if extra_body_parameters is not None: | ||
| init_args["extra_body_parameters"] = deepcopy(extra_body_parameters) | ||
| result = OpenAIResponseTarget(**init_args) | ||
| if grammar_name is not None: | ||
| result._grammar_name = grammar_name | ||
| return result |
There was a problem hiding this comment.
The new fresh_instance method on OpenAIResponseTarget has no test coverage in tests/unit/target/test_openai_response_target.py. Given that the rest of the file has comprehensive test coverage for all public methods, this is inconsistent. Tests should verify that fresh_instance returns an instance with the same base config, that extra_body_parameters override is applied correctly, and that grammar_name is set correctly on the result.
| objective_scorer=self._objective_scorer, | ||
| auxiliary_scorers=self._auxiliary_scorers, | ||
| role_filter="assistant", | ||
| objective=context.objective, |
There was a problem hiding this comment.
In _score_beam_async, the call to Scorer.score_response_async does not pass skip_on_error_result=True, unlike the analogous call in PromptSendingAttack._evaluate_response_async (which explicitly passes skip_on_error_result=True). If any message piece has a response_error that is not "none", the scorer will not skip it and may raise an unexpected error. This could cause the entire asyncio.TaskGroup to fail rather than gracefully skipping the problematic beam. The _propagate_beam_async method has a broad try/except that handles this case there, but scoring errors are not handled here.
| objective=context.objective, | |
| objective=context.objective, | |
| skip_on_error_result=True, |
| # Review beams at the top of the loop for simplicity | ||
| beams = self._beam_reviewer.review(beams=beams) |
There was a problem hiding this comment.
The _perform_async method calls self._beam_reviewer.review(beams=beams) on the very first iteration (step 0) when all beams have text="" and score=0.0. On the first iteration, reviewing uninitialized beams causes the reviewer to sort and potentially duplicate beams that all have identical state, which is a wasted operation. The reviewer is designed to improve beam quality based on previous scores, but in the first iteration no scores exist yet.
Consider skipping the review on the first iteration (when all beams are uninitialized), or initializing beams after the first propagation step.
| # Review beams at the top of the loop for simplicity | |
| beams = self._beam_reviewer.review(beams=beams) | |
| # Review beams only after the first iteration when they have been updated | |
| if step > 0: | |
| beams = self._beam_reviewer.review(beams=beams) |
Description
Use the Lark grammar feature of the
OpenAIResponseTargetto create a beam search for PyRIT. This is a single turn attack, where a collection of candidate responses (the beams) are maintained. On each iteration, the model's response is allowed to extend a little for each beam. The beams are scored, with the worst performing ones discarded, and replaced with copies of higher scoring beams.Tests and Documentation
Have basic unit tests of the classes added, but since this requires features only currently in the
OpenAIResponseTargetthere didn't seem much point in mocking that. There is a notebook which runs everything E2E.